package org.hornetq.core.client.impl;

import java.io.File;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.Future;
import org.hornetq.utils.HQIterator;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
import org.hornetq.utils.TokenBucketLimiter;

/* loaded from: input_file:WEB-INF/lib/hornetq-core-2.1.2.Final.jar:org/hornetq/core/client/impl/ClientConsumerImpl.class */
public class ClientConsumerImpl implements ClientConsumerInternal {
    public static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
    public static final int NUM_PRIORITIES = 10;
    private final ClientSessionInternal session;
    private final Channel channel;
    private final long id;
    private final SimpleString filterString;
    private final SimpleString queueName;
    private final boolean browseOnly;
    private final Executor sessionExecutor;
    private final int clientWindowSize;
    private final int ackBatchSize;
    private LargeMessageBufferImpl currentLargeMessageBuffer;
    private ClientMessageInternal largeMessageReceived;
    private final TokenBucketLimiter rateLimiter;
    private volatile Thread receiverThread;
    private volatile Thread onMessageThread;
    private volatile MessageHandler handler;
    private volatile boolean closing;
    private volatile boolean closed;
    private volatile int creditsToSend;
    private volatile Exception lastException;
    private volatile int ackBytes;
    private volatile ClientMessageInternal lastAckedMessage;
    private final SessionQueueQueryResponseMessage queueInfo;
    private volatile boolean ackIndividually;
    private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    public static final SimpleString FORCED_DELIVERY_MESSAGE = new SimpleString("_hornetq.forced.delivery.seq");
    private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl(10);
    private final Runner runner = new Runner();
    private volatile boolean slowConsumerInitialCreditSent = false;
    private boolean stopped = false;
    private final AtomicLong forceDeliveryCount = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.1.2.Final.jar:org/hornetq/core/client/impl/ClientConsumerImpl$Runner.class */
    public class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ClientConsumerImpl.this.callOnMessage();
            } catch (Exception e) {
                ClientConsumerImpl.log.error("Failed to call onMessage()", e);
                ClientConsumerImpl.this.lastException = e;
            }
        }
    }

    public ClientConsumerImpl(ClientSessionInternal clientSessionInternal, long j, SimpleString simpleString, SimpleString simpleString2, boolean z, int i, int i2, TokenBucketLimiter tokenBucketLimiter, Executor executor, Channel channel, SessionQueueQueryResponseMessage sessionQueueQueryResponseMessage) {
        this.id = j;
        this.queueName = simpleString;
        this.filterString = simpleString2;
        this.browseOnly = z;
        this.channel = channel;
        this.session = clientSessionInternal;
        this.rateLimiter = tokenBucketLimiter;
        this.sessionExecutor = executor;
        this.clientWindowSize = i;
        this.ackBatchSize = i2;
        this.queueInfo = sessionQueueQueryResponseMessage;
    }

    private ClientMessage receive(long j, boolean z) throws HornetQException {
        checkClosed();
        if (this.largeMessageReceived != null) {
            this.largeMessageReceived.discardLargeBody();
            this.largeMessageReceived = null;
        }
        if (this.rateLimiter != null) {
            this.rateLimiter.limit();
        }
        if (this.handler != null) {
            throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot call receive(...) - a MessageHandler is set");
        }
        if (this.clientWindowSize == 0) {
            startSlowConsumer();
        }
        this.receiverThread = Thread.currentThread();
        if (j == 0) {
            j = Long.MAX_VALUE;
        }
        boolean z2 = false;
        long j2 = -1;
        long j3 = j;
        while (true) {
            ClientMessageInternal clientMessageInternal = null;
            try {
                synchronized (this) {
                    while (true) {
                        if (!this.stopped) {
                            ClientMessageInternal removeFirst = this.buffer.removeFirst();
                            clientMessageInternal = removeFirst;
                            if (removeFirst != null) {
                                break;
                            }
                        }
                        if (this.closed || j3 <= 0) {
                            break;
                        }
                        if (j2 == -1) {
                            j2 = System.currentTimeMillis();
                        }
                        if (clientMessageInternal == null && z) {
                            if (this.stopped) {
                                break;
                            }
                            if (!z2) {
                                this.session.forceDelivery(this.id, this.forceDeliveryCount.incrementAndGet());
                                z2 = true;
                            }
                        }
                        try {
                            wait(j3);
                        } catch (InterruptedException e) {
                        }
                        if (clientMessageInternal != null || this.closed) {
                            break;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        j3 -= currentTimeMillis - j2;
                        j2 = currentTimeMillis;
                    }
                }
                if (clientMessageInternal == null) {
                    resetIfSlowConsumer();
                    this.receiverThread = null;
                    return null;
                }
                this.session.workDone();
                if (clientMessageInternal.containsProperty(FORCED_DELIVERY_MESSAGE)) {
                    if (clientMessageInternal.getLongProperty(FORCED_DELIVERY_MESSAGE).longValue() < this.forceDeliveryCount.longValue()) {
                        continue;
                    } else if (z) {
                        resetIfSlowConsumer();
                        this.receiverThread = null;
                        return null;
                    }
                }
                boolean isExpired = clientMessageInternal.isExpired();
                flowControlBeforeConsumption(clientMessageInternal);
                if (!isExpired) {
                    if (clientMessageInternal.isLargeMessage()) {
                        this.largeMessageReceived = clientMessageInternal;
                    }
                    ClientMessageInternal clientMessageInternal2 = clientMessageInternal;
                    this.receiverThread = null;
                    return clientMessageInternal2;
                }
                clientMessageInternal.discardLargeBody();
                this.session.expire(this.id, clientMessageInternal.getMessageID());
                if (this.clientWindowSize == 0) {
                    startSlowConsumer();
                }
                if (j3 <= 0) {
                    return null;
                }
            } finally {
                this.receiverThread = null;
            }
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receive(long j) throws HornetQException {
        return receive(j, false);
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receive() throws HornetQException {
        return receive(0L, false);
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public ClientMessage receiveImmediate() throws HornetQException {
        return receive(0L, true);
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public MessageHandler getMessageHandler() throws HornetQException {
        checkClosed();
        return this.handler;
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public synchronized void setMessageHandler(MessageHandler messageHandler) throws HornetQException {
        checkClosed();
        if (this.receiverThread != null) {
            throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot set MessageHandler - consumer is in receive(...)");
        }
        boolean z = this.handler == null;
        if (this.handler != messageHandler && this.clientWindowSize == 0) {
            startSlowConsumer();
        }
        this.handler = messageHandler;
        if (this.handler != null && z) {
            requeueExecutors();
        } else {
            if (this.handler != null || z) {
                return;
            }
            waitForOnMessageToComplete(true);
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public void close() throws HornetQException {
        doCleanUp(true);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void cleanUp() {
        try {
            doCleanUp(false);
        } catch (HornetQException e) {
            log.warn("problem cleaning up: " + this);
        }
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void stop() throws HornetQException {
        stop(true);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void stop(boolean z) throws HornetQException {
        waitForOnMessageToComplete(z);
        synchronized (this) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void clearAtFailover() {
        clearBuffer();
        this.lastAckedMessage = null;
        this.creditsToSend = 0;
        this.ackIndividually = false;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void start() {
        this.stopped = false;
        requeueExecutors();
    }

    @Override // org.hornetq.api.core.client.ClientConsumer
    public Exception getLastException() {
        return this.lastException;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SessionQueueQueryResponseMessage getQueueInfo() {
        return this.queueInfo;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public long getID() {
        return this.id;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SimpleString getFilterString() {
        return this.filterString;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public SimpleString getQueueName() {
        return this.queueName;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public boolean isBrowseOnly() {
        return this.browseOnly;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleMessage(ClientMessageInternal clientMessageInternal) throws Exception {
        if (this.closing) {
            return;
        }
        if (clientMessageInternal.getAddress() == null) {
            clientMessageInternal.setAddressTransient(this.queueInfo.getAddress());
        }
        clientMessageInternal.onReceipt(this);
        if (clientMessageInternal.getPriority() != 4) {
            this.ackIndividually = true;
        }
        this.buffer.addLast(clientMessageInternal, clientMessageInternal.getPriority());
        if (this.handler == null) {
            notify();
        } else {
            if (this.stopped) {
                return;
            }
            queueExecutor();
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessage(SessionReceiveLargeMessage sessionReceiveLargeMessage) throws Exception {
        if (this.closing) {
            return;
        }
        flowControl(sessionReceiveLargeMessage.getPacketSize(), false);
        ClientMessageImpl clientMessageImpl = new ClientMessageImpl();
        clientMessageImpl.setDeliveryCount(sessionReceiveLargeMessage.getDeliveryCount());
        clientMessageImpl.decodeHeadersAndProperties(HornetQBuffers.wrappedBuffer(sessionReceiveLargeMessage.getLargeMessageHeader()));
        clientMessageImpl.setLargeMessage(true);
        File file = null;
        if (this.session.isCacheLargeMessageClient()) {
            file = File.createTempFile("tmp-large-message-" + clientMessageImpl.getMessageID() + "-", ".tmp");
            file.deleteOnExit();
        }
        this.currentLargeMessageBuffer = new LargeMessageBufferImpl(this, sessionReceiveLargeMessage.getLargeMessageSize(), 60, file);
        clientMessageImpl.setBuffer(this.currentLargeMessageBuffer);
        clientMessageImpl.setFlowControlSize(0);
        handleMessage(clientMessageImpl);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage sessionReceiveContinuationMessage) throws Exception {
        if (this.closing) {
            return;
        }
        this.currentLargeMessageBuffer.addPacket(sessionReceiveContinuationMessage);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void clear(boolean z) throws HornetQException {
        synchronized (this) {
            HQIterator<ClientMessageInternal> it = this.buffer.iterator();
            while (true) {
                ClientMessageInternal next = it.next();
                if (next != null) {
                    flowControlBeforeConsumption(next);
                } else {
                    clearBuffer();
                }
            }
        }
        waitForOnMessageToComplete(z);
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public int getClientWindowSize() {
        return this.clientWindowSize;
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public int getBufferSize() {
        return this.buffer.size();
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void acknowledge(ClientMessage clientMessage) throws HornetQException {
        ClientMessageInternal clientMessageInternal = (ClientMessageInternal) clientMessage;
        if (this.ackIndividually) {
            if (this.lastAckedMessage != null) {
                flushAcks();
            }
            this.session.individualAcknowledge(this.id, clientMessage.getMessageID());
        } else {
            this.ackBytes += clientMessage.getEncodeSize();
            if (this.ackBytes >= this.ackBatchSize) {
                doAck(clientMessageInternal);
            } else {
                this.lastAckedMessage = clientMessageInternal;
            }
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void flushAcks() throws HornetQException {
        if (this.lastAckedMessage != null) {
            doAck(this.lastAckedMessage);
        }
    }

    @Override // org.hornetq.core.client.impl.ClientConsumerInternal
    public void flowControl(int i, boolean z) throws HornetQException {
        if (this.clientWindowSize >= 0) {
            this.creditsToSend += i;
            if (this.creditsToSend >= this.clientWindowSize) {
                if (this.clientWindowSize != 0 || !z) {
                    if (trace) {
                        log.trace("Sending " + i + " from flow-control");
                    }
                    int i2 = this.creditsToSend;
                    this.creditsToSend = 0;
                    sendCredits(i2);
                    return;
                }
                if (trace) {
                    log.trace("Sending " + this.creditsToSend + " -1, for slow consumer");
                }
                this.slowConsumerInitialCreditSent = false;
                int i3 = this.creditsToSend - 1;
                this.creditsToSend = 0;
                sendCredits(i3);
            }
        }
    }

    private void startSlowConsumer() {
        if (this.slowConsumerInitialCreditSent) {
            return;
        }
        if (trace) {
            log.trace("Sending 1 credit to start delivering of one message to slow consumer");
        }
        this.slowConsumerInitialCreditSent = true;
        sendCredits(1);
    }

    private void resetIfSlowConsumer() {
        if (this.clientWindowSize == 0) {
            this.slowConsumerInitialCreditSent = false;
            sendCredits(0);
        }
    }

    private void requeueExecutors() {
        for (int i = 0; i < this.buffer.size(); i++) {
            queueExecutor();
        }
    }

    private void queueExecutor() {
        if (trace) {
            log.trace("Adding Runner on Executor for delivery");
        }
        this.sessionExecutor.execute(this.runner);
    }

    private void sendCredits(int i) {
        this.channel.send(new SessionConsumerFlowCreditMessage(this.id, i));
    }

    private void waitForOnMessageToComplete(boolean z) {
        if (this.handler == null || !z || Thread.currentThread() == this.onMessageThread) {
            return;
        }
        Future future = new Future();
        this.sessionExecutor.execute(future);
        if (future.await(10000L)) {
            return;
        }
        log.warn("Timed out waiting for handler to complete processing");
    }

    private void checkClosed() throws HornetQException {
        if (this.closed) {
            throw new HornetQException(102, "Consumer is closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void callOnMessage() throws Exception {
        ClientMessageInternal removeFirst;
        if (this.closing || this.stopped) {
            return;
        }
        this.session.workDone();
        MessageHandler messageHandler = this.handler;
        if (messageHandler != null) {
            if (this.rateLimiter != null) {
                this.rateLimiter.limit();
            }
            synchronized (this) {
                removeFirst = this.buffer.removeFirst();
            }
            if (removeFirst != null) {
                boolean isExpired = removeFirst.isExpired();
                flowControlBeforeConsumption(removeFirst);
                if (isExpired) {
                    this.session.expire(this.id, removeFirst.getMessageID());
                } else {
                    this.onMessageThread = Thread.currentThread();
                    if (trace) {
                        log.trace("Calling handler.onMessage");
                    }
                    messageHandler.onMessage(removeFirst);
                    if (trace) {
                        log.trace("Handler.onMessage done");
                    }
                    if (removeFirst.isLargeMessage()) {
                        removeFirst.discardLargeBody();
                    }
                }
                if (this.clientWindowSize == 0) {
                    startSlowConsumer();
                }
            }
        }
    }

    private void flowControlBeforeConsumption(ClientMessageInternal clientMessageInternal) throws HornetQException {
        if (clientMessageInternal.getFlowControlSize() != 0) {
            flowControl(clientMessageInternal.getFlowControlSize(), true);
        }
    }

    private void doCleanUp(boolean z) throws HornetQException {
        if (this.closed) {
            return;
        }
        this.closing = true;
        waitForOnMessageToComplete(true);
        if (this.currentLargeMessageBuffer != null) {
            this.currentLargeMessageBuffer.cancel();
            this.currentLargeMessageBuffer = null;
        }
        this.closed = true;
        synchronized (this) {
            if (this.receiverThread != null) {
                notify();
            }
            this.handler = null;
            this.receiverThread = null;
        }
        flushAcks();
        clearBuffer();
        if (z) {
            this.channel.sendBlocking(new SessionConsumerCloseMessage(this.id));
        }
        this.session.removeConsumer(this);
    }

    private void clearBuffer() {
        this.buffer.clear();
    }

    private void doAck(ClientMessageInternal clientMessageInternal) throws HornetQException {
        this.ackBytes = 0;
        this.lastAckedMessage = null;
        this.session.acknowledge(this.id, clientMessageInternal.getMessageID());
    }
}
